笔记摘要

这里介绍了java5中的线程锁技术:Lock和Condition,实现线程间的通信,其中的读锁和写锁的使用通过一个缓存系统进行了演示,对于Condition的应用通过一个阻塞队列进行演示。

线程锁技术:Lock & Condition 实现线程同步通信所属包:java.util.concurrent.locks

线程锁 说明
Synchronized 同步方法,锁对象是this;同步静态方法,锁对象是字节码.class;同步代码块,锁对象是任意对象,但必须是同一个对象
Lock 同步锁接口
ReentrantLock lock(),unlock(),newCondition()
ReadWriteLock 读写锁接口
ReentrantReadWriteLock readLock()获取读锁,writeLock()获取写锁
Condition 线程间通信 await()等待 signal()唤醒

1. Lock

Lock比传统线程模型中的synchronized方式更加面向对象,相对于synchronized 方法和语句它具有更广泛的锁定操作,此实现允许更灵活的结构,可以具有差别很大的属性,可以支持多个相关的 Condition 对象。

于现实生活中类似,锁本身也是一个对象。两个线程执行的代码片段要实现同步互斥的结果,它们必须用同一个Lock对象,锁是上在代表要操作的资源的类的内部方法中,而不是线程代码中。

ReentrantLock

方法声明 功能描述
lock() 获取锁
tryLock() 尝试获取锁
unock() 释放锁
newCondition() 获取锁的Condition

常用形式如下

  1. Lock lock = new ReentrantLock();
  2. public void doSth(){
  3. lock.lock();
  4. try {
  5. // 执行某些操作
  6. }finally {
  7. lock.unlock();
  8. }
  9. }

读写锁

分为读锁和写锁,多个读锁不互斥,读锁与写锁互斥,写锁与写锁互斥,这是由JVM自己控制的。你只要上好相应的锁即可。如果你的代码只读数据,可以很多人同时读,但不能同时写,那就上读锁;如果你的代码修改数据,只能有一个人在写,且不能同时读取,那就上写锁。总之,读的时候上读锁,写的时候上写锁!

读写锁的使用情景:

  • 如果代码只读数据,就可以很多人共同读取,但不能同时写。
  • 如果代码修改数据,只能有一个人在写,且不能同时读数据。

API中ReentrantReadWriteLock类提供的一个读写锁缓存示例:

  1. class CachedData {
  2.   Object data;
  3. volatile boolean cacheValid;
  4.   ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
  5.   void processCachedData() {
  6.    rwl.readLock().lock();
  7.    if (!cacheValid) {
  8.    // Must release read lock before acquiring write lock
  9.    rwl.readLock().unlock();
  10.    rwl.writeLock().lock();
  11.    // Recheck state because another thread might have acquired
  12.     // write lock and changed state before we did.
  13.    if (!cacheValid) {
  14.    data = ...
  15.    cacheValid = true;
  16.    }
  17.    // Downgrade by acquiring read lock before releasing write lock
  18. rwl.readLock().lock();
  19.    rwl.writeLock().unlock(); // Unlock write, still hold read
  20.    }
  21.    use(data);
  22.    rwl.readLock().unlock();
  23.   }
  24. }

读写锁的应用:编写一个缓存系统

注解:为了避免线程的安全问题,synchronized和ReadWriteLock都可以,synchronized也防止了并发读取,性能较低有一个线程先进去,开始读取数据,进行判断,发现没有数据,其他线程就没有必要进去了,就释放读锁,加上写锁,去查找数据写入,为了避免写入的其他对象等待,再做一次判断,数据写入完成后,释放写锁,上读锁,防止写入,还原原来的状态。

两次判断:第一次为了写入数据,所以释放读锁,上写锁。第二次为了防止阻塞的线程重复写入

  1. import java.util.HashMap;
  2. import java.util.Map;
  3. import java.util.concurrent.locks.ReadWriteLock;
  4. import java.util.concurrent.locks.ReentrantReadWriteLock;
  5. public class CacheDemo {
  6. //定义一个map用于缓存对象
  7. private Map<String, Object> cache = new HashMap<String, Object>();
  8. //获取一个读写锁对象
  9. private ReadWriteLock rwl = new ReentrantReadWriteLock();
  10. //带有缓存的获取指定值的方法
  11. public Object getData(String key){
  12. rwl.readLock().lock(); //上读锁
  13. Object value = null;
  14. try{
  15. value = cache.get(key); //获取要查询的值
  16. if(value == null){ //线程出现安全问题的地方
  17. rwl.readLock().unlock(); //没有数据,释放读锁,上写锁
  18. // 多个线程去上写锁,第一个上成功后,其他线程阻塞,第一个线程开始执行下面的代码,最后
  19. // 释放写锁后,后面的线程继续上写锁,为了避免后面的线程重复写入,进行二次判断
  20. rwl.writeLock().lock();
  21. try{
  22. if(value==null){ //二次判断,防止其他线程重复写数据
  23. value = "aaaa"; //实际是去查询数据库
  24. }
  25. }finally{
  26. rwl.writeLock().unlock(); //写完数据,释放写锁
  27. }
  28. rwl.readLock().lock(); //恢复读锁
  29. }
  30. }finally{
  31. rwl.readLock().unlock(); //最终释放读锁
  32. }
  33. return value; //返回获取到的值
  34. }
  35. }

虚假唤醒:用while代替if

  1. Lock lock = new ReentrantLock();
  2. try {
  3. lock.lock();
  4. //需要加锁的代码
  5. }finally {
  6. lock.unlock();
  7. }

读写锁测试

  1. public class ReadWriteLockTest {
  2. public static void main(String[] args) {
  3. final Queue3 q3 = new Queue3();
  4. for(int i=0;i<3;i++)
  5. {
  6. new Thread(){
  7. public void run(){
  8. while(true){
  9. q3.get();
  10. }
  11. }
  12. }.start();
  13. new Thread(){
  14. public void run(){
  15. while(true){
  16. q3.put(new Random().nextInt(10000));
  17. }
  18. }
  19. }.start();
  20. }
  21. }
  22. }
  23. class Queue3{
  24. private Object data = null;
  25. ReadWriteLock rwl = new ReentrantReadWriteLock
  26. ();
  27. public void get(){
  28. rwl.readLock().lock();
  29. try {
  30. System.out.println(Thread.currentThread().getName() + " be ready to read data!");
  31. Thread.sleep((long)(Math.random()*1000));
  32. System.out.println(Thread.currentThread().getName() + "have read data :" + data);
  33. } catch (InterruptedException e) {
  34. e.printStackTrace();
  35. }finally{
  36. rwl.readLock().unlock();
  37. }
  38. }
  39. public void put(Object data){
  40. rwl.writeLock().lock();
  41. try {
  42. System.out.println(Thread.currentThread().getName() + " be ready to write data!");
  43. Thread.sleep((long)(Math.random()*1000));
  44. this.data = data;
  45. System.out.println(Thread.currentThread().getName() + " have write data: " + data);
  46. } catch (InterruptedException e) {
  47. e.printStackTrace();
  48. }finally{
  49. rwl.writeLock().unlock();
  50. }
  51. }
  52. }
  1. Thread-0 be ready to read data!
  2. Thread-2 be ready to read data!
  3. Thread-4 be ready to read data!
  4. Thread-0have read data :null
  5. Thread-2have read data :null
  6. Thread-4have read data :null
  7. Thread-5 be ready to write data!
  8. Thread-5 have write data: 7975
  9. Thread-5 be ready to write data!
  10. Thread-5 have write data: 9832
  11. Thread-3 be ready to write data!
  12. Thread-3 have write data: 2813
  13. Thread-3 be ready to write data!
  14. Thread-3 have write data: 7998
  15. Thread-1 be ready to write data!
  16. Thread-1 have write data: 6737
  17. Thread-1 be ready to write data!
  18. ...

2. Condition

用于实现线程间的通信,是为了解决Object.wait()、nitify()、notifyAll()难以使用的问题

Condition 将 Object 监视器方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用,为每个对象提供多个等待 set(wait-set)。其中,Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 监视器方法wait和notify的使用

一个锁内部可以有多个Condition,即有多路等待通知,传统的线程机制中一个监视器对象上只能有一路等待和通知,要想实现多路等待和通知,必须嵌套使用多个同步监视器对象。使用一个监视器往往会产生顾此失彼的情况。

在等待 Condition 时,允许发生“虚假唤醒”,这通常作为对基础平台语义的让步。对于大多数应用程序,这带来的实际影响很小,因为 Condition 应该总是在一个循环中被等待,并测试正被等待的状态声明。某个实现可以随意移除可能的虚假唤醒,但建议应用程序程序员总是假定这些虚假唤醒可能发生,因此总是在一个循环中等待。

方法声明 功能描述
await() 线程等待
await(long time, TimeUnit unit) 线程等待特定的时间,超过等待时间则为超时
signal() 随机唤醒某个等待线程
signalAll() 唤醒所有等待中的线程

Condition的应用:阻塞队列(使用了两个监视器)

说明:该应用是 java.util.concurrent.locks包中Condition接口中的示例代码。使用了两个Condition分别用于管理取数据的线程,和存数据的线程,这样就可以明确的唤醒需要的一类线程,如果使用一个Condition,当队列满了之后,唤醒的并不一定就是取数据的线程

  1. class BoundedBuffer {
  2. final Lock lock = new ReentrantLock();
  3. final Condition notFull = lock.newCondition();
  4. final Condition notEmpty = lock.newCondition();
  5. final Object[] items = new Object[100];
  6. int putptr, takeptr, count;
  7. public void put(Object x) throws InterruptedException {
  8. lock.lock();
  9. try {
  10. while (count == items.length) //循环判断队列是否已存满
  11. notFull.await(); //如果队列存满了,则要存入数据的线程等待
  12. items[putptr] = x;
  13. if (++putptr == items.length) putptr = 0;//当队列放满,指针回到0
  14. ++count; //添加了一个数据
  15. notEmpty.signal(); //队列中有数据了,所以就唤醒取数据的线程
  16. } finally {
  17. lock.unlock();
  18. }
  19. }
  20. public Object take() throws InterruptedException {
  21. lock.lock();
  22. try {
  23. while (count == 0) //循环判断,队列是否有空位
  24. notEmpty.await(); //要取的线程等待
  25. Object x = items[takeptr];
  26. if (++takeptr == items.length) takeptr = 0;
  27. --count; //取走一个,说明队列有空闲的位置,
  28. notFull.signal(); //所以通知存入的线程
  29. return x;
  30. } finally {
  31. lock.unlock();
  32. }
  33. }
  34. }

Condition测试

  1. public class ConditionCommunication {
  2. public static void main(String[] args) {
  3. final Business business = new Business();
  4. new Thread(
  5. new Runnable() {
  6. @Override
  7. public void run() {
  8. for(int i=1;i<=5;i++){
  9. business.sub(i);
  10. }
  11. }
  12. }
  13. ).start();
  14. for(int i=1;i<=5;i++){
  15. business.main(i);
  16. }
  17. }
  18. class Business {
  19. Lock lock = new ReentrantLock();
  20. Condition condition = lock.newCondition();
  21. private boolean bShouldSub = true;
  22. public void sub(int i){
  23. lock.lock();
  24. try{
  25. while(!bShouldSub){
  26. try {
  27. condition.await();
  28. } catch (Exception e) {
  29. e.printStackTrace();
  30. }
  31. }
  32. for(int j=1;j<=2;j++){
  33. System.out.println("sub thread sequence of " + j + ",loop of " + i);
  34. }
  35. bShouldSub = false;
  36. condition.signal();
  37. }finally{
  38. lock.unlock();
  39. }
  40. }
  41. public void main(int i){
  42. lock.lock();
  43. try{
  44. while(bShouldSub){
  45. try {
  46. condition.await();
  47. } catch (Exception e) {
  48. e.printStackTrace();
  49. }
  50. }
  51. for(int j=1;j<=4;j++){
  52. System.out.println("main thread sequence of " + j + ",loop of " + i);
  53. }
  54. bShouldSub = true;
  55. condition.signal();
  56. }finally{
  57. lock.unlock();
  58. }
  59. }
  60. }
  61. }

输出结果

  1. sub thread sequence of 1,loop of 1
  2. sub thread sequence of 2,loop of 1
  3. main thread sequence of 1,loop of 1
  4. main thread sequence of 2,loop of 1
  5. main thread sequence of 3,loop of 1
  6. main thread sequence of 4,loop of 1
  7. sub thread sequence of 1,loop of 2
  8. sub thread sequence of 2,loop of 2
  9. main thread sequence of 1,loop of 2
  10. main thread sequence of 2,loop of 2
  11. main thread sequence of 3,loop of 2
  12. main thread sequence of 4,loop of 2
  13. sub thread sequence of 1,loop of 3
  14. sub thread sequence of 2,loop of 3
  15. main thread sequence of 1,loop of 3
  16. main thread sequence of 2,loop of 3
  17. main thread sequence of 3,loop of 3
  18. main thread sequence of 4,loop of 3
  19. sub thread sequence of 1,loop of 4
  20. sub thread sequence of 2,loop of 4
  21. main thread sequence of 1,loop of 4
  22. main thread sequence of 2,loop of 4
  23. main thread sequence of 3,loop of 4
  24. main thread sequence of 4,loop of 4
  25. sub thread sequence of 1,loop of 5
  26. sub thread sequence of 2,loop of 5
  27. main thread sequence of 1,loop of 5
  28. main thread sequence of 2,loop of 5
  29. main thread sequence of 3,loop of 5
  30. main thread sequence of 4,loop of 5

使用ReentrantLock和Condition实现一个简单的阻塞队列MyArrayBlockingQueue,如果调用take方法时集合中没有数据,那么调用线程就阻塞;如果调用put方法时,集合数据已满,那么也会引起调用线程阻塞。但是,这两个阻塞的条件时不同的,分别为为notFull和notEmpty

  1. import java.util.concurrent.locks.Condition;
  2. import java.util.concurrent.locks.Lock;
  3. import java.util.concurrent.locks.ReentrantLock;
  4. public class MyArrayBlockingQueue<T> {
  5. // 数据数组
  6. private final T[] items;
  7. // 锁
  8. private final Lock lock = new ReentrantLock();
  9. // 队满的条件
  10. private Condition notFull = lock.newCondition();
  11. // 队空条件
  12. private Condition notEmpty = lock.newCondition();
  13. // 头部索引
  14. private int head;
  15. // 尾部索引
  16. private int tail;
  17. // 数据的个数
  18. private int count;
  19. public MyArrayBlockingQueue(int maxSize) {
  20. items = (T[]) new Object[maxSize];
  21. }
  22. public MyArrayBlockingQueue() {
  23. this(10);
  24. }
  25. public void put(T t) {
  26. lock.lock();
  27. try {
  28. while (count == getCapacity()) {
  29. System.out.println("数据已满,等待");
  30. notFull.await();
  31. }
  32. items[tail] = t;
  33. if (++tail == getCapacity()) {
  34. tail = 0;
  35. }
  36. ++count;
  37. notEmpty.signalAll(); // 唤醒等待数据的线程
  38. } catch (InterruptedException e) {
  39. e.printStackTrace();
  40. } finally {
  41. lock.unlock();
  42. }
  43. }
  44. public T take() {
  45. lock.lock();
  46. try {
  47. while (count == 0) {
  48. System.out.println("还没有数据,请等待");
  49. notEmpty.await();
  50. }
  51. T ret = items[head];
  52. items[head] = null;
  53. if (++head == getCapacity()) {
  54. head = 0;
  55. }
  56. --count;
  57. notFull.signalAll(); // 唤醒添加数据的线程
  58. return ret;
  59. } catch (InterruptedException e) {
  60. e.printStackTrace();
  61. } finally {
  62. lock.unlock();
  63. }
  64. return null;
  65. }
  66. public int getCapacity() {
  67. return items.length;
  68. }
  69. public int size() {
  70. lock.lock();
  71. try {
  72. return count;
  73. } finally {
  74. lock.unlock();
  75. }
  76. }
  77. public static void main(String[] args) {
  78. MyArrayBlockingQueue<Integer> aQueue = new MyArrayBlockingQueue<Integer>();
  79. aQueue.put(3);
  80. aQueue.put(24);
  81. for (int i = 0; i < 5; i++) {
  82. System.out.println(aQueue.take());
  83. }
  84. }
  85. }

输出结果

  1. 3
  2. 24
  3. 还没有数据,请等待

3. Condition练习

一共有3个线程,两个子线程先后循环2次,接着主线程循环3次,接着又回到两 个子线程先后循环2次,再回到主线程又循环3次,如此循环5次。

思路:老二先执行,执行完唤醒老三,老三执行完唤醒老大,老大执行完唤醒老二,以此循环,所以定义3个Condition对象和一个执行标识即可

示例出现的问题:两个文件中有同名类的情况

解决方案:可以将一个文件中的那个同名外部类放进类中,但是静态不能创建内部类的实例对象,所以需要加上static,这样两个类的名称就不一样了。 一个是原来的类名,一个是在自己类名前面加上外部类的类名。

  1. import java.util.concurrent.locks.Condition;
  2. import java.util.concurrent.locks.Lock;
  3. import java.util.concurrent.locks.ReentrantLock;
  4. public class ThreeConditionCommunication {
  5. public static void main(String[] args){
  6. final Business business = new Business();
  7.    //创建并启动子线程老二
  8. new Thread(new Runnable(){
  9. @Override
  10. public void run() {
  11. for(int i=1;i<=5;i++){
  12. business.sub2(i);
  13. }
  14. }
  15. }).start();
  16.    //创建并启动子线程老三
  17. new Thread(new Runnable(){
  18. @Override
  19. public void run() {
  20. for(int i=1;i<=5;i++){
  21. business.sub3(i);
  22. }
  23. }
  24. }).start();
  25. //主线程
  26. for(int i=1;i<=5;i++){
  27. business.main(i);
  28. }
  29. }
  30. static class Business{
  31. Lock lock = new ReentrantLock();
  32. Condition condition1 = lock.newCondition();
  33. Condition condition2 = lock.newCondition();
  34. Condition condition3 = lock.newCondition();
  35. //定义一个变量来决定线程的执行权
  36. private int ShouldSub = 1;
  37. public void sub2(int i){
  38. //上锁,不让其他线程执行
  39. lock.lock();
  40. try{
  41. if(ShouldSub != 2){ //如果不该老二执行,就等待
  42. try {
  43. condition2.await();
  44. } catch (InterruptedException e) {
  45. e.printStackTrace();
  46. }
  47. }
  48. for(int j=1;j<=2;j++){
  49. System.out.println("sub thread sequence of"+i+",loop of "+j);
  50. }
  51. ShouldSub = 3; //准备让老三执行
  52. condition3.signal(); //唤醒老三
  53. }finally{
  54. lock.unlock();
  55. }
  56. }
  57. public void sub3(int i){
  58. lock.lock();
  59. try{
  60. if(ShouldSub != 3){
  61. try {
  62. condition3.await();
  63. } catch (InterruptedException e) {
  64. e.printStackTrace();
  65. }
  66. }
  67. for(int j=1;j<=2;j++){
  68. System.out.println("sub2 thread sequence of"+i+",loop of "+j);
  69. }
  70. ShouldSub = 1; //准备让老大执行
  71. condition1.signal(); //唤醒老大
  72. }finally{
  73. lock.unlock();
  74. }
  75. }
  76.    //主线程
  77. public void main(int i){
  78. lock.lock();
  79. try{
  80. if(ShouldSub!=1){
  81. try {
  82. condition1.await();
  83. } catch (InterruptedException e) {
  84. e.printStackTrace();
  85. }
  86. }
  87. for(int j=1;j<=3;j++){
  88. System.out.println("main thread sequence of"+i+", loop of "+j);
  89. }
  90. ShouldSub = 2; //准备让老二执行
  91. condition2.signal(); //唤醒老二
  92. }finally{
  93. lock.unlock();
  94. }
  95. }
  96. }
  97. }

输出结果

  1. main thread sequence of1, loop of 1
  2. main thread sequence of1, loop of 2
  3. main thread sequence of1, loop of 3
  4. sub thread sequence of1,loop of 1
  5. sub thread sequence of1,loop of 2
  6. sub2 thread sequence of1,loop of 1
  7. sub2 thread sequence of1,loop of 2
  8. main thread sequence of2, loop of 1
  9. main thread sequence of2, loop of 2
  10. main thread sequence of2, loop of 3
  11. sub thread sequence of2,loop of 1
  12. sub thread sequence of2,loop of 2
  13. sub2 thread sequence of2,loop of 1
  14. sub2 thread sequence of2,loop of 2
  15. main thread sequence of3, loop of 1
  16. main thread sequence of3, loop of 2
  17. main thread sequence of3, loop of 3
  18. sub thread sequence of3,loop of 1
  19. sub thread sequence of3,loop of 2
  20. sub2 thread sequence of3,loop of 1
  21. sub2 thread sequence of3,loop of 2
  22. main thread sequence of4, loop of 1
  23. main thread sequence of4, loop of 2
  24. main thread sequence of4, loop of 3
  25. sub thread sequence of4,loop of 1
  26. sub thread sequence of4,loop of 2
  27. sub2 thread sequence of4,loop of 1
  28. sub2 thread sequence of4,loop of 2
  29. main thread sequence of5, loop of 1
  30. main thread sequence of5, loop of 2
  31. main thread sequence of5, loop of 3
  32. sub thread sequence of5,loop of 1
  33. sub thread sequence of5,loop of 2
  34. sub2 thread sequence of5,loop of 1
  35. sub2 thread sequence of5,loop of 2

4. 多路等待和通知

  1. class BoundedBuffer {
  2. final Lock lock = new ReentrantLock();
  3. final Condition notFull = lock.newCondition();
  4. final Condition notEmpty = lock.newCondition();
  5. final Object[] items = new Object[100];
  6. int putptr, takeptr, count;
  7. public void put(Object x) throws InterruptedException {
  8. lock.lock();
  9. try {
  10. while (count == items.length)
  11. notFull.await();
  12. items[putptr] = x;
  13. if (++putptr == items.length) putptr = 0;
  14. ++count;
  15. notEmpty.signal();
  16. } finally {
  17. lock.unlock();
  18. }
  19. }
  20. public Object take() throws InterruptedException {
  21. lock.lock();
  22. try {
  23. while (count == 0)
  24. notEmpty.await();
  25. Object x = items[takeptr];
  26. if (++takeptr == items.length) takeptr = 0;
  27. --count;
  28. notFull.signal();
  29. return x;
  30. } finally {
  31. lock.unlock();
  32. }
  33. }
  34. }